# Construct store x group indices with quarterly unit values, same weights and options as posted price indices  
# JHL 

# Clean environment and initialize 
rm(list = ls(all = TRUE))

# Packages 
list.of.packages <- c("folderfun", "data.table", "bit64", "lubridate", "foreign", "dplyr", "ggplot2")
new.packages <- list.of.packages[!(list.of.packages %in% installed.packages()[,"Package"])]
if(length(new.packages)) install.packages(new.packages,repos = "http://cran.us.r-project.org")

require(folderfun)
require("data.table")
require("lubridate")
require("parallel")
require("bit64")
require(foreign)
setNumericRounding(0)

# Setup paths
setff("path_home")
path_home <- ffpath_home()

repository <- sprintf("%s/dta/nielsen/sqg_posted_group",path_home)
dta_path <- sprintf("%s/raw/nielsen",path_home)
do_path <- sprintf("%s/do/data_prep/01_price_index",path_home)

PI_functions_opt <- sprintf('%s/05_PIq_fns.R',do_path)

# Source functions
source(PI_functions_opt)

# Current code generates roughly 40K x 114 = 4.6M files, need sufficient capacity 

fw_out <- sprintf('%s/dta/nielsen/PIq_06_15/fix_weight/state_group',path_home)

store_out <- sprintf('%s/dta/nielsen/PIq_06_15/PI_by_Store',path_home)
load_marker <- sprintf('%s/dta/nielsen/PIq_06_15/load_marker',path_home)
store_marker <- sprintf('%s/dta/nielsen/PIq_06_15/store',path_home)
all_marker <- sprintf('%s/dta/nielsen/PIq_06_15/all_done',path_home)
partial <- sprintf('%s/dta/nielsen/PIq_06_15/partial',path_home)

	 # Create RData data table of file names and store codes 
	 if(!file.exists(sprintf("%s/sqg_0615_files.RData",do_path))){
		
		 # Much more efficient listing method using system call
			# Can also sort by size with -S, then run arrays based on size 
			# Ideal: only run largest files with bigmem2, then randomize remaining across broadwl
		 ptm <- proc.time()
		 setwd(repository)

		 file.table <- system("ls -S", intern = TRUE)
		 file.table <- as.list(file.table)
		 file.table <- lapply(file.table, function(x) paste(repository,"/",x,sep=""))
		 file.table <- data.table(unlist(file.table))
		 set.seed(10)
		 file.table[,':='(size_id=.I,id=sample(1:nrow(file.table), nrow(file.table), replace=F))]
		 setnames(file.table,"V1","file_name")

		file.table[ ,file:=tstrsplit(file_name,"/",fixed=T)[[length(tstrsplit(file_name,"/",fixed=T))]] ]
		file.table[ ,state:=tstrsplit(file,"_",fixed=T)[[1]] ]
		file.table[, file:=NULL]
		
		 save(file.table, file = sprintf("%s/sqg_0615_files.RData",do_path)) # Save to RData for quick loading 		 		 
		 proc.time() - ptm

	}

	load(sprintf("%s/sqg_0615_files.RData",do_path))
	 
# Get the ID from the slurm array to have a variable name
# Naturally string, if use numerical value transform into numeric 	 
myID <- as.numeric(Sys.getenv("SLURM_ARRAY_TASK_ID"))

verbose = T 

prev.time <- verbose.Print.Time(verbose, "List locations")

setkey(file.table,id)

# Run if broadwl
if(0==1){
	quant = quantile(unique(file.table$id),seq(0,1,by=0.01))
	file.table.id = file.table[id>=quant[[myID]] & id<=quant[[myID+1]],]	 
	file.table <- file.table.id[size_id>118,]
}

# Run if bigmem2 
if(1==1){
	size_quant = quantile(unique(file.table$size_id),seq(0,1,by=0.005)) 	
	file.table = file.table[size_id>=size_quant[[myID]] & size_id<=size_quant[[myID+1]],]	 	 	
}

# Options for collapse opt: Default F is by store
produce.entire.output <- function(core = c(), collapse_opt = FALSE, fix_weight_opt = FALSE ) {
    
    marker <- 1L # L assigns integer value instead of numerical, saves memory 

    for(i in file.table$id){
	
		# Format: state_group (state is quantile for parallelization) 
		temp <- strsplit(file.table[id==i,file_name],split="/")
		sg <- temp[[1]][length(temp[[1]])]
		state <- as.numeric(strsplit(sg,split="_")[[1]][1])
		rds <- strsplit(sg,split="_")[[1]][2]
		group <- as.numeric(strsplit(rds,split="[.]")[[1]][1])
		
		# Track progress 
        INIT <- sprintf('%s/%d_%s.csv', load_marker, state, group)
        STORE <- sprintf('%s/%d_%s.csv', store_marker, state, group)
        # MSA <- sprintf('%s/%d_%s.csv', msa_marker, state, group)
        ALL <- sprintf('%s/%d_%s.csv', all_marker, state,group)
        PARTIAL <- sprintf('%s/%d_%s.csv', partial, state,group)
        if((file.exists(INIT) & file.exists(STORE) )) { # & file.exists(MSA)
            cat(sprintf(' State : %d & group : %s entire skip', state, group))
            cat('\n')
            next
        }
		

        cat(sprintf('loading: %d state & %s group', state, group))
        cat('\n')
        write.csv(marker, file = INIT, row.names= FALSE)
        prev.time <- verbose.Print.Time(T, sprintf('loading: %d state & %s group', state, group))
		
		## Load raw data
		dt_state_save <- readRDS(file.table[id==i,file_name])

        ## Run if non-empty
        if (nrow(dt_state_save) == 0) {
            
            cat(sprintf('State : %d & group %s: NULL_dt', state, group))
            cat('\n')
            write.csv(marker, file = STORE, row.names= FALSE)

            next
        }
		
		# Return same data table with only locations that are not missing in any year 
        dt_store <- geo.level.check(dt_state_save, "by_store")
        
        if (nrow(dt_store) < 4) {
            cat(sprintf('State : %d & group : %s < 4', state, group))
            cat('\n')
            write.csv(marker, file = STORE, row.names= FALSE)
 
            gc()
            next
        }
		
		# Process variables
		dt_state_save[,':='(quantile=NULL)]
		dt_state_save[,Volume:=Units*Price]

		
		# If weights were not created, make sure code does not stop 
		# Must assign to global environment, or else gets lost inside the loop
		if( file.exists(sprintf('%s/%d_%s.RData',fw_out, state, group)) ) {
			load(sprintf('%s/%d_%s.RData',fw_out, state, group), envir = .GlobalEnv)
		} else {
			# Must assign to global environment
			w_ft <<- data.table(store_code_uc=0,upc=0,upc_ver_uc=0,q_fix=0,s_fix=0)
		}
		
        ## let's subset this based on the store code 
        
		if (collapse_opt == F) {
			dt_store <- fast.subset(dt_state_save, "by_store")
		}
		
        cat('subset done & PI calc start ')
        cat('\n')
        prev.time <- verbose.Print.Time(T, "subset done & PI calc start", prev.time)
        
		# Output 
        if (!file.exists(STORE)) {
            Result <- parallel.county(dt_store, group, core)
            rm(dt_store)
            
            write.csv(marker, file = STORE, row.names= FALSE)
            
            cat(sprintf('%s: saving started', group))
            cat('\n')
            lapply(names(Result), function(i) {
                write.csv(Result[[i]], file =sprintf('%s/%d_%s_%s.csv',
                                                     store_out, state, group, i), row.names= FALSE)})
            rm(Result)
            
        }
        
		# Report progress 
        cat(sprintf('%s: saving started', group))
        cat('\n')
        
        write.csv(marker , file = ALL , row.names= F)
        
        gc()
        
    }
    
    
}

produce.entire.output(12)

cat('Quantile done')
cat('\n')




    





















